Cloud Composer(Airflow)でGCSTimeSpanFileTransformOperatorを使ってみる
はじめに
データアナリティクス事業本部のkobayashiです。
前回GoogleCloudのCloud Composer(Airflow)でGCSFileTransformOperatorを使ったファイル変換処理のエントリを書きました。まとめの中で複数ファイルの処理ではGCSTimeSpanFileTransformOperatorのほうが良さそうだとお伝えしたのでそちらも試してみます。
GCSTimeSpanFileTransformOperatorとは
前回ご紹介したGCSFileTransformOperatorではソースであるGCS上のファイルをローカルに一時的に保存してから変換スクリプトにてファイを変換し、変換後に再びGCSへアップロードするといった処理を行いました。GCSFileTransformOperatorに関してはGCSFileTransformOperatorの公式ドキュメント のパラメータの項目からもわかるようにsource_object
では任意のオブジェクトのパスを指定して与えなくてはならなく、ディレクトリのパスを指定して複数ファイルを処理することはできません。
複数ファイルの変換を行いたい場合はGCSTimeSpanFileTransformOperatorを使う必要があります。このオペレータはDAG実行の期間にGCS上の指定パスに追加・更新されたオブジェクトを検索し、それらのオブジェクトをローカルにダウンロードしてから変換スクリプトにて変換を行い、変換後に再びGCSへアップロードするといった処理を行います。
GCSFileTransformOperatorと違い注意しなくてはならないのが期間です。これはDAG開始時間から次のDAGがスケジュールされている時間までを意味しています。したがって、DAG実行前に作成・更新されたオブジェクトは処理対象から外れます。
GCSTimeSpanFileTransformOperatorの使い方
GCSTimeSpanFileTransformOperator( task_id="gcs_transform", source_bucket="{ソースバケット名}", source_prefix="{ソースオブジェクトのパスPrefix}", source_gcp_conn_id="{GCSのConnection}", destination_bucket="{出力バケット名}", destination_prefix="{出力オブジェクトのパスPrefix}", destination_gcp_conn_id="{GCSのConnection}", transform_script=['python', 'script.py', 10], )
- gcp_conn_id: Airflowの管理コンソールで設定したConnection名
- source_bucket: ソースオブジェクトのあるバケット
- source_object: ソースオブジェクトのパスPrefix
- Prefixなので
*
を使って複数オブジェクトを指定できる
- Prefixなので
- destination_bucket: 出力先のバケット
- destination_object: 出力先のオブジェクトパスPrefix
- transform_script: 変換するスクリプトをリスト形式で指定
transform_scriptの書き方ですがリスト形式で指定します。こちらもGCSFileTransformOperator同様にリストで渡した値をsubprocess.Popenで実行しているだけなのでPopenの使い方で記述すれば良いだけです。また変換スクリプトではPythonだけでなくシェルスクリプトなども使うことができます。
変換スクリプトの書き方としてはソースファイル群が保存されているローカルディレクトリが第一引数、アップロードされるディレクトリが第二引数として与えられます。また第三引数として期間の開始日時、第四引数として期間の終了日時が与えられます。
Cloud ComposerでGCSTimeSpanFileTransformOperatorを使ってみる
では実際にGCSTimeSpanFileTransformOperatorを使ってみたいと思います。処理の内容としてはGCS上にUTF-16でエンコードされたCSVがあり、これをUTF-8に変換するような前回と同じ処理を行います。
環境
- Composerバージョン: 2.0.28
- Airflowバージョン: 2.3.3
行う内容としては、AirflowでDAGを作成するdagファイルをPythonで記述します。変換ファイルは今回もUTF-16をUTF-8に変換するだけなのでお手軽にiconv
コマンドを使うシェルスクリプトを記述します。
ディレクトリ構成は以下になります。
Composer用のDAGファイルパス ├── transform_utf16_ts.py └── scripts └── transform_utf16_ts.sh
import os from datetime import datetime from airflow import DAG from airflow.providers.google.cloud.operators.gcs import ( GCSTimeSpanFileTransformOperator, ) from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator work_dir = os.path.dirname(os.path.abspath(__file__)) with DAG( dag_id="gcs2bq-transfer_json_ts", start_date=datetime.now(), schedule_interval=None, default_args={"retries": 1}, tags=["gcs2bq"], ) as dag: # DAG内のファイル更新時間になるようにファイルをCopyする copy_files = GCSToGCSOperator( task_id="copy_files", source_bucket="sample_bucket", source_object="user_events/20230101/7001000000*.json", destination_bucket="sample_bucket", destination_object="user_events_tgt/20230101/", ) # 実処理 gcs_transform = GCSTimeSpanFileTransformOperator( task_id="gcs_transform", source_bucket="sample_bucket", source_prefix="user_events_tgt/20230101", source_gcp_conn_id="google_cloud_storage_default", destination_bucket="sample_bucket", destination_prefix="user_events_tgt/20230101", destination_gcp_conn_id="google_cloud_storage_default", transform_script=["bash", f"{work_dir}/test_script/transform_utf16_ts.sh"], ) copy_files >> gcs_transform
GCSToGCSOperatorですが、GCSTimeSpanFileTransformOperatorでは先程説明した通りsourceパス配下のオブジェクトのうち、DAGが実行された日時以降に作成・更新されたオブジェクトが対象となるため、DAG中でオブジェクトをコピーしてオブジェクト作成日が期間に含まれるようにしています。
transform_utf16_ts.sh
#! /bin/bash find $1 -name "*.csv" | while read -r fname do iconv -f UTF-16 -t UTF-8 $fname > "utf8-$fname" done find $1 -name "utf8-*.csv" | xargs -I% mv % $2
ソースディレクトリは$1
で取得できアップロードディレクトリは$2
で取得できるので上記のような非常に簡単なワンライナーのコードになります。
GCSTimeSpanFileTransformOperatorの動きとしては
$1
に対象のパスからオブジェクトがダウンロードされる- スクリプトが実行される
$2
にあるファイルがdestinationにアップロードされる
といった流れになるため、2で行われるのがファイル個別のパスが与えられるのではなく、あくまでディレクトリが渡されることに注意してスクリプトを記述する必要があります。
あとはこれをCloud ComposerでTrigger DAGで実行すればGCS上のUTF-16のcsvファイル群がUTF-8のcsvファイルに変換されます。
まとめ
GoogleCloudのCloud Composer(Airflow)でGCSTimeSpanFileTransformOperatorを使ってGCS上のファイルを変換スクリプトで変換してみました。GCSFileTransformOperatorと違って複数オブジェクトを変換対象にできるため幅広い使い方ができるかと思います。
注意点としてはスクリプトに与えられるのがパスであるという点と期間内に作成・変更されたオブジェクトのみが対象になる点です。
最後まで読んで頂いてありがとうございました。